package quewriter

import (
	"errors"
	"fmt"
	"gitee.com/ymofen/gobase"
	"gitee.com/ymofen/panicsafe"
	"io"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

// 队列写入, 启用go 进行写入
type QueueWriter struct {
	closedflag   int32
	maxcachel    int32
	copybuf      int8
	timeout      time.Duration
	wg           sync.WaitGroup
	w            io.Writer
	obound       chan []byte
	OnAfterWrite func(p []byte, n int, err error)
}

var (
	queueCacheBufSize int32
	queueWriterAliveN int32
	queueRunN         int32
)

type WriteWrapper struct {
	writefunc func(buf []byte) (n int, err error)
}

func GetQueueWriterStatus() string {
	return fmt.Sprintf("alive-n:%d, run:%d, cachebuf:%d", atomic.LoadInt32(&queueWriterAliveN), atomic.LoadInt32(&queueRunN), atomic.LoadInt32(&queueCacheBufSize))
}

func NewWriteWrapper(fn func(buf []byte) (n int, err error)) *WriteWrapper {
	return &WriteWrapper{writefunc: fn}
}

func (this *WriteWrapper) Write(buf []byte) (n int, err error) {
	return this.writefunc(buf)
}

// maxcachel: 最大允许缓存长度, 0:不限制, 压入时会进行阻塞
func NewQueueWriter(w io.Writer, maxcachel int32) *QueueWriter {
	que := &QueueWriter{w: w,
		obound:    make(chan []byte, maxcachel),
		maxcachel: maxcachel,
		timeout:   time.Second,
	}
	atomic.AddInt32(&queueWriterAliveN, 1)
	runtime.SetFinalizer(que, func(obj interface{}) {
		atomic.AddInt32(&queueWriterAliveN, -1)
	})
	que.copybuf = 1
	que.wg.Add(1)
	go que.start()
	return que
}

func (this *QueueWriter) RequestDis(reason string) {
	this.Close()
}

func (this *QueueWriter) SetCopyBuf(flag int8) {
	this.copybuf = flag
}

func (this *QueueWriter) Close() error {
	if atomic.CompareAndSwapInt32(&this.closedflag, 0, 1) {
		select {
		case this.obound <- nil:
			return nil
		case <-time.After(this.timeout):
			return errors.New("push to chan timeout!")
		}
	}
	return nil
}

func (this *QueueWriter) Wait() {
	this.wg.Wait()
}

func (this *QueueWriter) start() {
	if panicsafe.GoFunCatchException {
		defer panicsafe.DeferCatchPanic()
	}
	atomic.AddInt32(&queueRunN, 1)
	defer atomic.AddInt32(&queueRunN, -1)
	defer this.wg.Done()
breakfor:
	for {
		select {
		case buf := <-this.obound:
			if buf == nil {
				break breakfor
			}
			atomic.AddInt32(&queueCacheBufSize, -int32(len(buf)))
			if atomic.LoadInt32(&this.closedflag) == 0 {
				n, err := this.w.Write(buf)
				evt := this.OnAfterWrite
				if evt != nil {
					evt(buf, n, err)
				}
			}
		case <-time.After(this.timeout):
			if atomic.LoadInt32(&this.closedflag) == 1 {
				break breakfor
			}
		}
	}
	this.w = nil
}

func (this *QueueWriter) Write(buf []byte) (n int, err error) {
	if len(buf) == 0 {
		return 0, nil
	}
	if atomic.LoadInt32(&this.closedflag) == 1 {
		return -1, io.ErrClosedPipe
	}
	newBuf := gobase.CloneBytes(buf, 0, 0)
	n = len(newBuf)
	select {
	case this.obound <- newBuf:
		atomic.AddInt32(&queueCacheBufSize, int32(n))
		return n, nil
	case <-time.After(this.timeout):
		return -1, errors.New("QueueWriter:push to chan timeout!")
	}
}
